#!/usr/bin/perl -ws use Digest::MD5 qw/md5_base64/; use Digest::HMAC_SHA1 qw/hmac_sha1_hex/; use MIME::Base64; use Net::HTTP; use Time::Local; use Fcntl qw(:flock); use Errno qw(EINTR EAGAIN); use constant VERSION => '24.04.29'; use constant CACHE_FORMAT => '1'; use constant EXTRA_DEBUG => 0; use constant LOG_CONF_FILE => 0; use constant API_HOST => 'cp.ahcdn.com'; use constant API_URI => '/api2/'; use constant FORK_DELAY => 1; die " VERSION: ".VERSION." USAGE: import_files.pl [options] /path/to/conf/dir1/ [path/to/conf/dir2/file.conf ...] Conf dirs must be writeable - there will be kept work files, like vCDN files lists, lock files etc. All *.conf files are interpreted as active import config. To disable some config rename its file to something like name.conf.off OPTIONS: -fork[=1] - process all origins in parallel =4 - fork at most 4 processes -error_log=path/to/error.log - /dev/stderr -action_log=path/to/action.log - /dev/stdout -info_log=path/to/info_debug.log - /dev/stderr -n|-dry_run - do not perform actual actions (it still updates db-cache files if necessary) -d|-debug - print additional debug info (to info_log) -s|-silent - supress 'info' messages (to info_log) -h|-help - print this help CONFIG FILE PARAMETERS: # It is a comment. Empty lines are ignored too. # # boolean values may take only values from list (1, 0, on, off, true, false, t, f) # # time intervals can use suffixes s(econds) [default], m(inutes), h(ours), d(ays), w(eeks) # for example: 1m30s, 4h30m # # sizes can be specified using suffixes [kK], [mM], [gG], [tT] # for example: 1G200M, 1.5MB login = loginname - login to use for https://cp.ahcdn.com/api2/file/ password = password - its password for 'Authorization: value' refresh_active = 1d - refresh 'active' files list every 24 hours [monitor_conf_update = false] - if 'true' - exit if config file change size or inode # global error/action/info log is used if one is not specified in config [error_log = path/to/error.log] - log for errors [action_log = path/to/action.log] - log for all actions (add/del/rm etc) [info_log = path/to/info_debug.log] - log for info/debug messages scan_url = rsync://[user\@pass:]host/modulepath/ = swift://user:tenant:key\@host/v3/container/ = s3://access_key:secret_key\@host/bucket/ = /path/to/content/dir/ [scan_exclude = keyword1 [keyword2 ..]] - keywords to use in --exclude for `rsync` [scan_interval = 30m] - issue scanning each 30 minutes [scan_min_age = 10m] - ignore files which are younger than 10 minutes used to avoid importing of partially uploaded files forced to '0' for swift:// and s3:// sources due atomic updates [scan_max_age = 0d] - ignore too old files (reduce memory usage) [min_size = 1M] - only files greater than 1MB will be imported [max_add = 0] - if >0 - do not add/update more files per run [max_del = 0] - if >0 - do not delete more files per run (see 'delete_missing') [max_rm = 0] - if >0 - do not remove more files per run (see 'delete_imported') [fetch_ext = mp4 mkv flv] - other extentions will be ignored [fetch_url = http://[user\@pass:]host/[dir/]] - url to fetch new files from if not specified - scan_url is used [fetch_verify = true] - verify that added file can be actually fetched [fetch_sign_method = nginx_securelink|swift_hmac_sha1] [fetch_sign_securelink_argument = md5] [fetch_sign_key = secretvalue] - 'secret\$uri\$arg_time' for securelink (\$remote_addr not supported) [fetch_sign_ttl = ttl_value] [fetch_prio = 0] [callback = http://mysite.com/cdn_import_callback.php?file=%FILE%&other_options] - see /api2/file/manual?method=add # next three parameters are applied in order: filter_re, cut_folders, post_re, add_folder [filter_re = RE] - perl RE to filter files, used in form m#\$RE# for example, value '(720|1080)p.mp4' (without quotes) [cut_folders = 0] - 0 - keep full path 'path/to/file.mp4' (default) 1 - leave 'to/file.mp4' (cut first dir) 2 - leave 'file.mp4' (cut first two dirs) 3 - ignore 'path/to/file.mp4', but add 'dir1/dir2/dir3/dir4.mp4' as 'dir4/file.mp4' -1 - delete all folders, leave just filename [post_re = s#from#to#ig] - perl substitute RE to apply on filename non-matching files will be ignored!! [add_folder = a/b] - add file path/to/file.mp4 as a/b/path/to/file.mp4 into vCDN [delete_imported = false] - if 'true' - delete alredy imported files from origin can not be set to 'true' together with 'delete_missing [delete_missing = false] - if 'true' - delete files from cdn which are missing on origin [update_only = false] - if 'true' - do not add new files, just update modified ones " if $h or $help or $#ARGV == -1; sub parse_time($); sub parse_size($); sub read_config($); sub read_configs(@); sub sort_configs(%); sub update_check_time($); sub log_action($@); sub log_message($@); sub log_info(@); sub log_debug(@); sub log_error(@); sub log_fatal(@); sub try_lock($); sub shell_escape($); sub www_escape($); sub open_scanner(); sub setup(); sub read_api($$); sub cb_read_nonactive($); sub cb_read_active($); sub write_file_list($$); sub read_active_cached($); sub read_active_files(); sub read_added_files(); sub check_conf_update(); sub rm_file($); sub verify_fetch($$); sub generate_signature($); sub add_file($$$); sub del_file($); sub call_api($%); sub finish(); sub process_origin(); sub scan_origin(); sub process_file($$$$$$$$); ## start of main programm $ENV{PATH} = '/bin:/usr/bin:/usr/local/bin'; $ENV{LANG} = ''; $ENV{LC_ALL} = 'C'; $DEBUG = ($d or $debug) ? 1 : 0; print STDERR "Enable EXTRA_DEBUG in code for extra debug messages\n" if $DEBUG and not EXTRA_DEBUG; $SILENT = ($s or $silent) ? 1 : 0; $DRY_RUN = ($n or $dry_run) ? 1 : 0; $ERROR_LOG = $error_log // '/dev/stderr'; $ACTION_LOG = $action_log // '/dev/stdout'; $INFO_LOG = $info_log // '/dev/stderr'; $FORK = $fork // 0; %CONF = (error_log => $ERROR_LOG, action_log => $ACTION_LOG, info_log => $INFO_LOG, conf_file => 'main program'); select STDERR; $| = 1; select STDOUT; $| = 1; if(0) { # supress warnings about unused variables print $info_log + $action_log + $error_log + $help + $dry_run + $silent + $debug + $fork + $force + $d + $s + $n + $h; } my %configs = read_configs(@ARGV); log_info('Read', scalar(keys %configs), 'configs'); my @conf_files = sort_configs(%configs); my($pid, $child_count) = (1, 0); for my $conf (@conf_files) { update_check_time($configs{$conf}); # check if conf is already processed %CONF = %{$configs{$conf}}; if($CONF{next_check} > time() and ! $DEBUG and ! $force) { log_debug('Skip for next', time()-$CONF{next_check}, 'seconds'); next; } my $L = try_lock($CONF{lock_file}); if(! $L) { log_debug("Already locked, skipping it.."); next; } if($FORK) { if($FORK > 1) { while($child_count >= $FORK) { if(wait() > 0) { $child_count--; } else { $child_count = 0; } } } else { sleep(FORK_DELAY); } $pid = fork(); $child_count ++; next if $pid > 0; } log_info("process $CONF{login} $conf $CONF{scan_url}"); process_origin(); } continue { close $L if $L; exit() if $FORK and $pid == 0; } exit(); ## end of main programm sub parse_time($) { my $time = shift; my $ret = 0; if($time !~ /^(?:[0-9](?:\.[0-9])?+[sSmMhHdDwW]?)+$/) { log_fatal("Unable to decode time parameter '$time'"); } while($time =~ /([0-9]+(?:\.[0-9])?)([sSmMhHdDwW]?)/g) { if(! $2 or $2 eq 's' or $2 eq 'S') { $ret += 0+ $1; } elsif($2 eq 'm' or $2 eq 'M') { $ret += 0+ $1 * 60; } elsif($2 eq 'h' or $2 eq 'H') { $ret += 0+ $1 * 60*60; } elsif($2 eq 'd' or $2 eq 'D') { $ret += 0+ $1 * 60*60*24; } elsif($2 eq 'w' or $2 eq 'W') { $ret += 0+ $1 * 60*60*24*7; } } log_debug("Decode time '$time' to '$ret'") if EXTRA_DEBUG; return $ret; } sub parse_size($) { my $size = shift; my $ret = 0; if($size !~ /^(?:[0-9]+(?:\.[0-9]+)?[kKmMgGtT]?[bB]?)+$/) { log_fatal("Unable to decode size parameter '$size'"); } while($size =~ /([0-9]+(?:\.[0-9]+)?)([kKmMgGtT]?)[bB]?/g) { if(! $2) { $ret += int($1); } elsif($2 eq 'k' or $2 eq 'K') { $ret += int($1 * 1024); } elsif($2 eq 'm' or $2 eq 'M') { $ret += int($1 * 1024*1024); } elsif($2 eq 'g' or $2 eq 'G') { $ret += int($1 * 1024*1024*1024); } elsif($2 eq 't' or $2 eq 'T') { $ret += int($1 * 1024*1024*1024*1024); } } log_debug("Decode size '$size' to '$ret'") if EXTRA_DEBUG; return $ret; } sub read_config($) { my $config_file = shift; log_debug("Reading config '$config_file'"); my %conf = ( # default values: filter_re => '', cut_folders => 0, post_re => '', add_folder => '', refresh_active => '1d', min_size => '1MB', fetch_ext => 'mp4 mkv flv', scan_min_age => '10m', scan_interval => '30m', fetch_sign_method => '', fetch_sign_securelink_argument => 'md5', fetch_verify => 1, ); open(my $C, '<', $config_file) or log_fatal("Fail to open '$config_file': $!"); while(<$C>) { s/\s+#.+//; # delete comments next if /^#/ or /^\s+$/; # skip comments and empty lines if(/^\s*(\S+)\s*=\s*(.*)/) { $conf{$1} = $2; log_debug("Read '$1' => '$2'") if EXTRA_DEBUG; } else { log_error("Unparseable line in '$config_file': $_"); return; } } my @stat = stat($C); $conf{conf_inode} = $stat[1]; $conf{conf_size} = $stat[7]; $conf{conf_mtime} = $stat[9]; close($C); $conf{conf_file} = $config_file; # protect 'conf_file' from redefining in config file $conf{lock_file} = $conf{conf_file} . '.lock'; for my $param (qw/login password scan_url/) { if(! $conf{$param}) { log_error("Not set parameter '$param' in '$config_file'"); return; } } for my $param (qw/delete_imported delete_missing update_only fetch_verify/) { next if ! exists $conf{$param}; if($conf{$param} =~ /^(?:1|true|t|on)$/i) { $conf{$param} = 1; } elsif($conf{$param} =~ /^(?:0|false|f|off)$/i) { $conf{$param} = 0; } else { log_error("Non boolean value in '$config_file' of '$param': $conf{$param}"); return; } } for my $param (qw/scan_min_age scan_max_age scan_interval refresh_active/) { $conf{$param} = exists $conf{$param} ? parse_time($conf{$param}) : 0; } for my $param (qw/min_size/) { $conf{$param} = exists $conf{$param} ? parse_size($conf{$param}) : 0; } for my $param (qw/fetch_ext/) { for my $val (split /\s+/, $conf{$param}) { $conf{$param}->{$val} = undef; } } if($conf{delete_imported} and $conf{scan_max_age}) { log_error("Can not use 'delete_imported' = $conf{delete_imported} and 'scan_max_age' = $conf{scan_max_age} in $config_file"); return; } $conf{error_log} ||= $ERROR_LOG; $conf{action_log} ||= $ACTION_LOG; $conf{info_log} ||= $INFO_LOG; if($conf{filter_re}) { $conf{filter_re_m} = qr/$conf{filter_re}/; } if($conf{post_re}) { if($conf{post_re} =~ /`/) { log_error("Symbol '`' is not allowed in 'post_re' in '$config_file'"); return; } if($conf{post_re} !~ /^s([#\/'"])(.+?)\1.*\1([ig]{0,2})$/) { log_error("Bad 'post_re' parameter ($conf{post_re}) in '$config_file'"); return; } if(! $conf{filter_re}) { my($re, $mod) = ($2, $3); if($mod =~ /i/) { $conf{filter_re_m} = qr/$re/i; } else { $conf{filter_re_m} = qr/$re/; } } } if($conf{cut_folders} and $conf{cut_folders} !~ /^-?[0-9]{1,99}$/) { log_error("Bad value '$conf{cut_folders}' of 'cut_folders' in $config_file"); return; } $conf{scan_min_age} = 0 if $conf{scan_url} =~ /^(?:swift|s3):/; $conf{fetch_url} .= '/' if $conf{fetch_url} =~ /[a-zA-Z0-9_]$/; $conf{scan_url} .= '/' if $conf{scan_url} !~ /\/$/; $conf{fetch_sign_method} = '' if $conf{fetch_sign_method} eq 'none'; $conf{fetch_sign_securelink_argument} = 'temp_url_sig' if $conf{fetch_sign_method} eq 'swift_hmac_sha1'; log_debug("Read config", map { $_ => $conf{$_} } sort keys %conf) if EXTRA_DEBUG; return %conf; } sub read_configs(@) { my %configs; while(my $config_entry = shift) { if(-f $config_entry) { # it should be a config file my %conf = read_config($config_entry); next if ! exists $conf{conf_file}; $configs{$config_entry} = { %conf }; next; } log_fatal("Not a directory '$config_entry'") if ! -d $config_entry; opendir(my $D, $config_entry) or log_fatal("Fail to open config dir '$config_entry': $!"); while(my $config_file = readdir($D)) { next if $config_file !~ /.+\.conf$/; my %conf = read_config("$config_entry/$config_file"); next if ! exists $conf{conf_file}; $configs{"$config_entry/$config_file"} = { %conf }; } closedir($D); } log_fatal("No configs") if scalar keys %configs == 0; return %configs; } sub update_check_time($) { my $href = shift; my $lock_file = $href->{lock_file}; if(-f $lock_file) { $href->{next_check} = (stat($lock_file))[9] + $href->{scan_interval}; # mtime } else { log_info("Fresh config '$href->{conf_file}'"); $href->{next_check} = 0; } } sub sort_configs(%) { my %configs = @_; while(my($conf, $href) = each %configs) { update_check_time($href); log_debug("Next check of '$conf' at", scalar(localtime($href->{next_check}))); } return sort { $configs{$a}->{next_check} <=> $configs{$b}->{next_check} } keys %configs; } sub log_action($@) { my $action = shift; log_fatal("No file on action '$action'") if $#_ == -1; my $message = join "\t", scalar(localtime()), $action, @_; $STAT{"action $action"} ++; my $log_add; if($action eq 'add') { log_fatal("No size on 'add' of '$_[0]'") if $#_ == 0; $log_add = "$_[1]\t$_[0]"; } elsif($action eq 'delete') { $log_add = "-\t$_[0]"; } elsif($action eq 'rm') { } else { log_fatal("Unknown action '$message'"); } if($log_add) { my $log_file = $CONF{conf_file} . ".added"; my $FH; if(! open $FH, '>>', $log_file) { log_fatal("Fail to open add log '$log_file': $!"); } if(! print $FH $log_add, "\n") { log_fatal("Fail to print to add log '$log_file' message '$log_add': $!"); } if(! close $FH) { log_fatal("Fail to close add log '$log_file' message '$log_add': $!"); } } my $FH; if(! open $FH, '>>', $CONF{action_log}) { log_fatal("Fail to open action log '$CONF{action_log}': $!"); } if(! print $FH $message, "\n") { log_fatal("Fail to print to action log '$CONF{action_log}' message '$message': $!"); } if(! close $FH) { log_fatal("Fail to close action log '$CONF{action_log}' message '$message': $!"); } log_info("'$action' @_"); } sub log_message($@) { my $log = shift; my $message = "\t@_\n"; $message = "\t" . $CONF{conf_file} . $message if LOG_CONF_FILE; $message = scalar(localtime()) . $message; my $FH; if($log eq '/dev/stderr') { if(! open $FH, '>>&', 'STDERR') { die "Fail to dup() log '$log' for message '$message': $!"; } } elsif($log eq '/dev/stdout') { if(! open $FH, '>>&', 'STDOUT') { die "Fail to dup() log '$log' for message '$message': $!"; } } else { print STDERR $message; if(! open $FH, '>>', $log) { die "Fail to open log '$log' for message '$message': $!"; } } if(! print $FH $message) { die "Fail to print to log '$log' message '$message': $!"; } if(! close $FH) { die "Fail to close log '$log' message '$message': $!"; } } sub log_info(@) { return if $SILENT; log_message($CONF{info_log}, @_); } sub log_debug(@) { return if ! $DEBUG or $SILENT; log_message($CONF{info_log}, @_); } sub log_error(@) { log_message($CONF{error_log}, @_); } sub log_fatal(@) { log_message($CONF{error_log}, @_); exit(1); } sub try_lock($) { my $lock_file = shift; my($L, $mode); $mode = -f $lock_file ? '+<' : '>>'; open($L, $mode, $lock_file) or log_fatal("Fail to open lockfile '$lock_file': $!"); if(flock($L, LOCK_EX | LOCK_NB)) { log_debug("Locked file $lock_file"); return $L; } log_debug("Fail to lock file $lock_file: $!"); close $L; return; } sub shell_escape($) { my $str = shift; $str = '' if not defined $str; if($str) { $str =~ s/([`"\\\$])/\\$1/g; } return('"' . $str . '"'); } sub www_escape($) { my $str = shift; my $ret; while(length($str) > 0) { $ret .= $1 if $str =~ s/^([a-zA-Z0-9_:\/.+-]+)//; $ret .= '%' . join '%', map { unpack 'H2' } split //, $1 if $str =~ s/^([^a-zA-Z0-9_:\/.+-]+)//; } return $ret; } sub open_scanner() { my $command = 'rsync -r --list-only --no-h --timeout=300'; if($CONF{scan_url} =~ /^rsync:\/\/(?:([^@]+)\@)?(\S+)/) { $CONF{scanner} = 'rsync'; $CONF{scan_uri} = "rsync://$2"; if($1) { ($ENV{USER}, $ENV{RSYNC_PASSWORD}) = split /:/, $1, 2; log_fatal("No password in scan_url '$CONF{scan_url}'") if ! $ENV{RSYNC_PASSWORD}; } $CONF{fetch_url} = $CONF{scan_url} if ! $CONF{fetch_url}; } elsif($CONF{scan_url} =~ /^swift:\/\//) { if($CONF{scan_url} =~ /^swift:\/\/([0-9]+):([0-9]+):([a-zA-Z0-9_+-]+)\@([a-z0-9._-]+)\/v3(\/.+)/) { ($CONF{swift_user}, $CONF{swift_tenant}, $CONF{swift_key}) = ($1, $2, $3); ($CONF{swift_host}, $CONF{swift_container}) = ("https://$4/v3", $5); } else { log_fatal("Bad swift:// url '$CONF{scan_url}'"); } my $ls = 'ls'; $CONF{scanner} = 'swift-short'; if($CONF{scan_min_age} or $CONF{scan_max_age}) { $ls = 'lsl'; $CONF{scanner} = 'swift-full'; } $command = "rclone --dry-run --swift-user $CONF{swift_user} --swift-tenant $CONF{swift_tenant} --swift-key $CONF{swift_key} --swift-auth $CONF{swift_host} --swift-domain default --swift-tenant-domain default --swift-auth-version 3 --swift-endpoint-type public $ls " . shell_escape(":swift:$CONF{swift_container}"); log_fatal("Not specified 'fetch_url'") if ! $CONF{fetch_url}; } elsif($CONF{scan_url} =~ /^s3:\/\//) { if($CONF{scan_url} =~ /^s3:\/\/([0-9a-f]+):([0-9a-f]+)\@([a-z0-9._-]+)(\/.+)/) { ($CONF{s3_access_key}, $CONF{s3_secret_key}) = ($1, $2); ($CONF{s3_host}, $CONF{s3_container}) = ("https://$3/", $4); } else { log_fatal("Bad s3:// url '$CONF{scan_url}'"); } my $ls = 'ls'; $CONF{scanner} = 's3-short'; if($CONF{scan_min_age} or $CONF{scan_max_age}) { $ls = 'lsl'; $CONF{scanner} = 's3-full'; } $command = "rclone --dry-run --s3-access-key-id=$CONF{s3_access_key} --s3-secret-access-key=$CONF{s3_secret_key} --s3-endpoint=$CONF{s3_host} $ls " . shell_escape(":s3:$CONF{s3_container}"); $CONF{fetch_url} = $CONF{scan_url} if ! $CONF{fetch_url}; } elsif(-d $CONF{scan_url}) { $CONF{scan_url} .= '/' if $CONF{scan_url} !~ /\/$/; # add trailing '/' if missing $CONF{scanner} = 'rsync'; $CONF{scan_uri} = $CONF{scan_url}; log_fatal("Not specified 'fetch_url'") if ! $CONF{fetch_url}; } else { log_fatal("Bad scan_url '$CONF{scan_url}'"); } $CONF{scan_exclude} .= ' *.jpg *.jpeg *.png *.gif'; for my $param (qw/include exclude/) { next if ! $CONF{"scan_$param"}; for my $word (split /\s+/, $CONF{"scan_$param"}) { next if ! $word; $word = "*$word" if $word !~ /\*/ and $word !~ /^\//; $word = shell_escape($word); $command .= " --$param=$word"; } } if($CONF{scanner} eq 'rsync') { $uri = shell_escape($CONF{scan_uri}); $command .= " $uri /var/empty/"; } open(my $fh, '-|', "nice $command") or log_fatal("Fail to start '$command': $!"); log_debug("Opened scanner at `$command`"); $CONF{scanner_fh} = $fh; } sub setup() { $CONF{api_auth_header} = "basic " . encode_base64("$CONF{login}:$CONF{password}", ''); $CONF{api_ua_header} = "AH vCDN importer v".VERSION; $CONF{cnt_add} = $CONF{cnt_del} = $CONF{cnt_rm} = 0; $CONF{start_time} = time(); %STAT = ( 'already added' => 0, 'already imported' => 0, 'line processed' => 0, 'cached added' => 0, 'cached active' => 0, ); update_procline(); } sub update_procline() { my $added = $STAT{'already added'} + $STAT{'already imported'}; my $processed = $STAT{'line processed'}; my $cached_added = $STAT{'cached added'}; my $cached_active = $STAT{'cached active'}; $0 = "import_files $CONF{cnt_add}/$added/$processed ($cached_added/$cached_active) $CONF{login} $CONF{conf_file} $CONF{scan_url}"; } sub read_api($$) { my($uri, $cb) = @_; log_debug("Read api data from '$uri'"); $uri = API_URI . $uri if substr($uri, length(API_URI)) ne API_URI; my $ua = Net::HTTP->new(Host => API_HOST, Connection => 'close'); if(! $ua->write_request(GET => $uri, 'User-Agent' => $CONF{api_ua_header}, 'Authorization' => $CONF{api_auth_header})) { log_fatal("Fail to call API '$uri': $!"); } my($code, $mess, %headers) = $ua->read_response_headers(); if($code != 200) { my @hh = %headers; log_fatal("Unable to fetch non-active files list from vCDN api: $code $mess (@hh)"); } log_debug("Got response $code $mess", %headers); my($buf_acc, $buf_read, $lines_read, $lines_reported, $header_read) = ('', '', 0, -1, 0); while(1) { my $n = $ua->read_entity_body($buf_read, 65536); next if $n == -1 and ($!{EINTR} or $!{EAGAIN}); if(not defined $n or $n == -1) { log_fatal("Fail to read non-active files list from vCDN api: $!") } if($n == 0) { if(length($buf_acc) > 0) { log_fatal("Reached EOF on non-empty buf while reading non-active files list from vCDN api, buf content is '$buf_acc'"); } last; } $buf_acc .= $buf_read; my $last_pos = 0; while($buf_acc =~ /.*?\n/g) { my $line = substr($buf_acc, $-[0], $+[0]-$-[0]-1); # substr($&, 0, -1) $last_pos = $+[0]; log_debug($line) if EXTRA_DEBUG; if($line =~ /^# /) { if($line =~ /^# ([0-9]+) rows$/) { $lines_reported = $1; } elsif($lines_read == 0 && $header_read == 0) { $header_read = 1; } else { log_fatal("Read unparseable line while reading non-active files list from vCDN api: $line"); } next; } if(! $header_read) { log_fatal("Read '$line' instead of header on calling '$uri'"); } $lines_read ++; &$cb($line); } $buf_acc = substr($buf_acc, $last_pos); # $buf_acc = $'; } $ua->get_trailers; if(length($buf_acc) != 0) { log_fatal("Unparsed data after reading $lines_read (of $lines_reported) on '$uri': $buf_acc"); } if($lines_read != $lines_reported) { log_fatal("Read $lines_read lines instead of $lines_reported on '$uri'"); } log_debug("Read $lines_read entries"); } sub cb_read_nonactive($) { my($status, $size, $name) = split /\t/, $_[0], 3; $STAT{"db: $status"} ++; if($status eq 'deleted' or $status eq 'failed' or $status eq 'blocked') { return; } if($status ne 'new' and $status ne 'replace' and $status ne 'disabled') { log_error("Unknown status '$status' of file '$name'"); return; } if($CONF{add_folder}) { if(length($name) <= length($CONF{add_folder}) or substr($name, 0, length($CONF{add_folder})+1) ne "$CONF{add_folder}/") { log_error("Bad name '$name' (folder '$CONF{add_folder}')"); return; } $name = substr($name, length($CONF{add_folder})+1); } $CONF{file_added}->{$name} = $size; } sub cb_read_active($) { my($size, $name) = split /\t/, $_[0], 2; if($CONF{add_folder}) { if(length($name) <= length($CONF{add_folder}) or substr($name, 0, length($CONF{add_folder})+1) ne "$CONF{add_folder}/") { log_fatal("Read active file '$name' but 'add_folder' is $CONF{add_folder}"); } $name = substr($name, length($CONF{add_folder})+1); } delete $CONF{file_added}->{$name} if exists $CONF{file_added}->{$name}; $CONF{file_active}->{$name} = $size; } sub write_file_list($$) { my($tag, $href) = @_; my $file = $CONF{conf_file} . '.' . $tag; open(my $fh, '>', $file) or log_fatal("Fail to open '$file': $!"); if($tag eq 'active') { my $header = "format = " . CACHE_FORMAT . " time = " . time() . " filter_re = $CONF{filter_re} cut_folders = $CONF{cut_folders} post_re = $CONF{post_re} add_folder = $CONF{add_folder} count = " . scalar(keys %$href) . "\n"; if(! print $fh $header) { my $err = $!; close $fh; unlink($file); log_fatal("Fail to print to '$file': $err"); } } while(my($name, $size) = each %$href) { if(! print $fh "$size\t$name\n") { my $err = $!; close $fh; unlink($file); log_fatal("Fail to print to '$file': $err"); } } if(! close $fh) { my $err = $!; unlink($file); log_fatal("Fail to print to '$file': $err"); } log_debug('Wrote', scalar(keys %$href), "entries to '$file'"); } sub read_active_cached($) { my $file = shift; open(my $fh, '-|', 'zcat', $file) or return; my $line = <$fh>; return if $line !~ /^format = (.+)/ or $1 ne CACHE_FORMAT; $line = <$fh>; return if $line !~ /^time = ([0-9]+)/ or $1 + $CONF{refresh_active} <= time(); $line = <$fh>; return if $line !~ /^filter_re = (.*)/ or $1 ne $CONF{filter_re}; $line = <$fh>; return if $line !~ /^cut_folders = (.*)/ or $1 != $CONF{cut_folders}; $line = <$fh>; return if $line !~ /^post_re = (.*)/ or $1 ne $CONF{post_re}; $line = <$fh>; return if $line !~ /^add_folder = (.*)/ or $1 ne $CONF{add_folder}; $line = <$fh>; return if $line !~ /^count = ([0-9]+)/; my $count = $1; while(my $line = <$fh>) { if(! chomp $line) { log_error("Broken file '$file': read line '$line'"); last; } my($size, $name) = split /\t/, $line, 2; $CONF{file_active}->{$name} = int($size); } close $fh; if($count != scalar(keys %{$CONF{file_active}})) { log_error('Read', scalar(keys %{$CONF{file_active}}), "from `zcat $file` instead of $count"); return; } log_debug("Read $count 'active' entries from archive"); $STAT{'cached active'} = $count; return 1; } sub read_active_files() { my $file = $CONF{conf_file} . '.active.gz'; if(-f $file) { return if read_active_cached($file); unlink($file); log_debug("Refresh active files"); } $CONF{file_added} = {}; $CONF{file_active} = {}; my $uri = 'file/list?format=text&fields=status,size,full_name&filter_status:ne=active'; if($CONF{add_folder}) { read_api($uri . "\&filter_folder:like=$CONF{add_folder}/%", \&cb_read_nonactive); read_api($uri . "\&filter_folder=$CONF{add_folder}", \&cb_read_nonactive); } else { read_api($uri, \&cb_read_nonactive); } $uri = 'file/list?format=text&fields=size,full_name&filter_status=active'; if($CONF{scan_max_age}) { $uri .= "\&filter_create_time:gt=" . (time()-$CONF{scan_max_age}); } if($CONF{add_folder}) { read_api($uri . "\&filter_folder:like=$CONF{add_folder}/%", \&cb_read_active); read_api($uri . "\&filter_folder=$CONF{add_folder}", \&cb_read_active); } else { read_api($uri, \&cb_read_active); } write_file_list('added', $CONF{file_added}); write_file_list('active', $CONF{file_active}); $file = $CONF{conf_file} . '.active'; system('nice', 'gzip', $file); if($?) { unlink($file, "$file.gz"); log_fatal("Fail to gzip '$file'"); } $STAT{'cached added'} = scalar(keys %{$CONF{file_added}}); $STAT{'cached active'} = scalar(keys %{$CONF{file_active}}); } sub read_added_files() { my $file = $CONF{conf_file} . '.added'; return if ! -f $file; open(my $fh, '<', $file) or log_fatal("Fail to open '$file': $!"); while(<$fh>) { chomp; my($size, $name) = split /\t/, $_, 2; delete $CONF{file_active}->{$name} if exists $CONF{file_active}->{$name}; if($size eq '-') { # deletion delete $CONF{file_added}->{$name} if exists $CONF{file_added}->{$name}; } else { $CONF{file_added}->{$name} = int($size); } } close $fh; log_debug("Read", scalar(keys %{$CONF{file_added}}), "'added' files from log"); } sub check_conf_update() { return if ! $CONF{monitor_conf_update}; my($ino, $size, $mtime) = (stat($CONF{conf_file}))[1,7,9]; if($ino != $CONF{conf_inode} or $size != $CONF{conf_size} or $mtime != $CONF{conf_mtime}) { log_debug("Updated conf with new ($ino, $size, $mtime) vs old ($CONF{conf_inode}, $CONF{conf_size}, $CONF{conf_mtime})"); log_info("Conf updated, exiting"); exit(0); } } sub rm_file($) { log_debug("rm_file(@_) [$CONF{cnt_rm}]"); check_conf_update(); return if $CONF{max_rm} and $CONF{cnt_rm} >= $CONF{max_rm}; my $file = $_[0]; my($f_dir, $f_name); if($file =~ m#^(.+)/([^/]+)$#) { ($f_dir, $f_name) = ($1, $2); } else { ($f_dir, $f_name) = ('', $file); } $f_dir = shell_escape("$CONF{scan_uri}/$f_dir/"); $f_name = shell_escape($f_name); if($CONF{scanner} eq 'rsync') { my $command = "cd /var/empty && rsync --delete-missing-args $f_name $f_dir"; log_debug($command); return if $DRY_RUN; my $ret = `$command 2>&1`; if($?) { log_error("Fail to delete file '$file' by `$command`: $ret"); } else { $CONF{cnt_rm} ++; log_action('rm', $file); } } elsif($CONF{scanner} ne 'rsync') { log_fatal("Not implemeted deletion on swift://"); } else { log_fatal("Unknown scanner type '$CONF{scanner}'"); } } sub verify_fetch($$) { my($location, $size) = @_; $CONF{fetch_verify} = 0; my $ret; if($location =~ /^rsync:\/\/(?:([^@]+)\@)?(\S+\/\S+\/.+)/) { my $uri = "rsync://$2"; my @env = ($ENV{USER}, $ENV{RSYNC_PASSWORD}); if($1) { ($ENV{USER}, $ENV{RSYNC_PASSWORD}) = split /:/, $1, 2; } my $command = 'rsync --list-only --no-h --timeout=10 ' . shell_escape($uri) . ' /var/empty/'; log_debug("Verify fetch by command `$command`:"); $ret = `$command 2>&1`; log_debug($ret); ($ENV{USER}, $ENV{RSYNC_PASSWORD}) = @env; if($ret =~ /^-((?:[r-][w-][x-]){3})\s+([0-9]+)\s+\S+\s+\S+\s+/m and $2 == $size and $1 =~ /r/) { return; } } elsif($location =~ /^https?:\/\//) { my $command = 'curl --head --silent --show-error ' . shell_escape($location); log_debug("Verify fetch by command `$command`"); $ret = `$command 2>&1`; log_debug($ret); if($ret =~ /^Content-Length:\s*([0-9]+)\r?$/im and $1 == $size) { return; } } elsif($location =~ /^s3:\/\/([0-9a-f]+):([0-9a-f]+)\@([^\/]+)(\/.+)/) { if($CONF{fetch_url} eq $CONF{scan_url}) { return; } return "Different s3:// scan and fetch urls"; } else { log_fatal("Don't know how to verify '$location' ($size)"); } finish(); log_fatal("Verify of '$location' ($size) fail: $ret"); } sub generate_signature($) { my $url = shift; my $uri = $url; if($uri !~ s/^https?:\/\/[^\/]+//) { log_fatal("Unable generate 'nginx_securelink' signature for '$url'"); } $uri =~ s/\?.+//; my($key_val); if($CONF{fetch_sign_method} eq 'nginx_securelink') { my $key = $CONF{fetch_sign_key} or log_fatal("Not set 'fetch_sign_key'"); my($arg_time, $expire); if($key =~ /\$arg_([a-z0-9]+)/) { $arg_time = $1; $expire = $CONF{fetch_sign_ttl} or log_fatal("Not set 'fetch_sign_ttl'"); $expire += time(); $key =~ s/\$arg_([a-z0-9]+)/$expire/; } if($key !~ s/\$uri/$uri/) { log_fatal("Wrong value of 'fetch_sign_key' - no '\$uri'"); } $key_val = md5_base64($key); $key_val =~ tr/+\/=/-_/d; $key_val .= "\&$arg_time=$expire" if $arg_time; } elsif($CONF{fetch_sign_method} eq 'swift_hmac_sha1') { my $expire = $CONF{fetch_sign_ttl} or log_fatal("Not set 'fetch_sign_ttl'"); $expire += time(); $key_val = hmac_sha1_hex("GET\n$expire\n$uri", $CONF{fetch_sign_key}); $key_val = "$key_val\&temp_url_expires=$expire"; } else { log_fatal("Unknown sign method '$CONF{fetch_sign_method}'"); } if($url =~ /\?/) { $url .= "\&$CONF{fetch_sign_securelink_argument}=$key_val"; } else { $url .= "?$CONF{fetch_sign_securelink_argument}=$key_val"; } log_debug("Generate signature: $url"); return $url; } sub add_file($$$) { my($orig_name, $f_name, $size) = @_; if($orig_name eq $f_name) { log_debug("add_file($f_name $size) [$CONF{cnt_add}]"); } else { log_debug("add_file(@_) [$CONF{cnt_add}]"); } check_conf_update(); if($CONF{max_add} and $CONF{cnt_add} >= $CONF{max_add}) { finish(); exit; } my %param; $param{size} = $size; $param{name} = $CONF{add_folder} ? "$CONF{add_folder}/$f_name" : $f_name; $param{fetch_prio} = $CONF{fetch_prio} if $CONF{fetch_prio}; if($CONF{callback}) { $param{callback} = $CONF{callback}; $param{callback} .= $param{name} if $param{callback} !~ s/%FILE%/$param{name}/; } $param{location} = "$CONF{fetch_url}$orig_name"; $param{location} = generate_signature($param{location}) if $CONF{fetch_sign_method}; log_debug("/file/add:", map { $_ => $param{$_} } sort keys %param); verify_fetch($param{location}, $size) if $CONF{fetch_verify}; return if $DRY_RUN; my $ret = call_api('file/add', %param); if($ret =~ /status:\s+ok/ or $ret =~ /already exists and have same size/ or $ret =~ /already exists in state '(?:blocked|disabled)'/) { $CONF{file_added}->{$f_name} = $size; log_action('add', $f_name, $size); $CONF{cnt_add} ++; update_procline(); return; } log_error("Unable to add file '$param{name}': $ret"); } sub del_file($) { log_debug("del_file(@_) [$CONF{cnt_del}]"); check_conf_update(); return if $CONF{max_del} and $CONF{cnt_del} >= $CONF{max_del}; my $f_name = shift; return if $DRY_RUN; my %param; $param{name} = $CONF{add_folder} ? "$CONF{add_folder}/$f_name" : $f_name; my $ret = call_api('file/delete', %param); if($ret =~ /status:\s+ok/) { log_action('delete', $f_name); $CONF{cnt_del} ++; } else { log_error("Unable to delete file '$f_name': $ret"); } } sub call_api($%) { my($method, %val) = @_; my $content = ''; while(my($param, $value) = each %val) { $content .= '&' if length($content) > 0; $content .= www_escape($param) . '=' . www_escape($value); } my $ua = Net::HTTP->new(Host => API_HOST, Connection => 'close'); if(! $ua->write_request(POST => "/api2/$method", 'User-Agent' => $CONF{api_ua_header}, 'Authorization' => $CONF{api_auth_header}, 'Content-Type' => 'application/x-www-form-urlencoded', $content)) { log_fatal("Fail to call API $method ($content): $!"); } ## TODO: trap exception in read_response_headers() my($code, $mess, @headers) = $ua->read_response_headers(); if($code ne '200') { log_fatal("Got '$code $mess' while calling API '$method' ($content): @headers"); } my $ret = ''; while(1) { my $buf; my $n = $ua->read_entity_body($buf, 65536); if(! defined $n) { log_fatal("Read error while calling API '$method' ($content) '$ret': $!"); } last if $n == 0; $ret .= $buf; } $ua->get_trailers; return $ret; } sub finish() { my $lock_file = $CONF{lock_file}; if(! utime(undef, undef, $lock_file)) { log_error("Fail to set mtime of '$lock_file': $!"); } for my $st (sort keys %STAT) { log_info("$st:\t", $STAT{$st}); } log_info('Complete in', time()-$CONF{start_time}, 'seconds'); if($STAT{'line processed'} == 0) { log_error('Empty origin'); } } sub process_origin() { setup(); read_active_files(); read_added_files(); open_scanner(); scan_origin(); finish(); } sub scan_origin() { my $fh = $CONF{scanner_fh}; if($CONF{scanner} eq 'rsync') { while(<$fh>) { log_debug('read line: ', $_) if EXTRA_DEBUG; next unless /^-(?:r[w-][x-]){3}\s+([0-9]+)\s+([0-9]{4})\/([0-9]{2})\/([0-9]{2}) ([0-9]{2}):([0-9]{2}):([0-9]{2}) (.+)/; process_file($8, $1, $2, $3, $4, $5, $6, $7); } } elsif($CONF{scanner} =~ /^(?:swift|s3)-short$/) { while(<$fh>) { log_debug('read line: ', $_) if EXTRA_DEBUG; if(/^\s*([0-9]+)\s+(.+)/) { process_file($2, $1, 1970, 1, 1, 0, 0, 0); } else { chomp; log_fatal("Unparseble line '$_'"); } } } elsif($CONF{scanner} =~ /^(?:swift|s3)-full$/) { while(<$fh>) { log_debug('read line: ', $_) if EXTRA_DEBUG; if(/^\s*([0-9]+)\s+([0-9]{4})-([0-9]{2})-([0-9]{2})\s+([0-9]{2}):([0-9]{2}):([0-9]{2})\.[0-9]{9}\s+(.+)/) { process_file($8, $1, $2, $3, $4, $5, $6, $7); } else { chomp; log_fatal("Unparseble line '$_'"); } } } else { log_fatal("Unknown scanner '$CONF{scanner}'"); } if(! close($CONF{scanner_fh})) { log_error("Error on closing scanner ($?): $!"); return; } if($CONF{delete_missing}) { while(my($f_name, undef) = each %{$CONF{file_active}}) { del_file($f_name); } } } sub process_file($$$$$$$$) { my ($f_name, $f_size, $year, $month, $mday, $hour, $min, $sec) = @_; log_debug("Process file '$f_name', size $f_size, $year-$month-$mday $hour:$min:$sec") if EXTRA_DEBUG; my $orig_name = $f_name; $STAT{'line processed'} ++; update_procline(); if($f_name =~ /[\\#?]/) { log_debug("Skip due non-printable/bad character: $f_name"); $STAT{'skip non-printable/bad char'} ++; return; } ## fast filters if($CONF{min_size} and $f_size < $CONF{min_size}) { log_debug("Skip '$f_name' due size $f_size < min_size=$CONF{min_size}"); $STAT{'skip size'} ++; return; } if($f_name !~ /\.([^.]{1,7})$/ or not exists $CONF{fetch_ext}->{$1}) { log_debug("Skip non valid ext '$1'") if EXTRA_DEBUG; $STAT{'skip ext'} ++; return; } if($f_name =~ /(^|\/)\.[^.]{1,7}$/) { log_debug("Skip empty file '$f_name'"); $STAT{'skip empty file'} ++; return; } if($CONF{scan_min_age} or $CONF{scan_max_age}) { my $f_time = timelocal($sec, $min, $hour, $mday, $month-1, $year); if($CONF{scan_min_age} and time() - $f_time < $CONF{scan_min_age}) { log_debug("Skip due min age $f_time") if EXTRA_DEBUG; $STAT{'skip min_age'} ++; return; } if($CONF{scan_max_age} and time() - $f_time > $CONF{scan_max_age}) { log_debug("Skip due max age $f_time") if EXTRA_DEBUG; $STAT{'skip max_age'} ++; return; } } if($CONF{filter_re_m}) { if($f_name !~ m#$CONF{filter_re_m}#) { log_debug("Skip due non-matching 'filter_re'") if EXTRA_DEBUG; $STAT{'skip filter_re'} ++; return; } } ## transform file name if($CONF{cut_folders}) { for (1 .. $CONF{cut_folders}) { # cut $CONF{cut_folder} folders from file name if($f_name !~ s/^[^\/]+\///) { # skip file if its name is too short log_debug("Cut $_/$CONF{cut_folders} folders") if EXTRA_DEBUG; $STAT{'skip short dir'} ++; return; } log_debug("Cut to '$f_name'") if EXTRA_DEBUG; } if($CONF{cut_folders} < 0) { # cut all folders $f_name =~ s/.+\///; log_debug("Cut all folders to '$f_name'") if EXTRA_DEBUG; } } if($CONF{post_re}) { eval "\$f_name =~ $CONF{post_re}"; log_debug("Applied 'post_re' to '$f_name'") if EXTRA_DEBUG; } ## check if file is already imported or in progress of importing if(exists $CONF{file_added}->{$f_name}) { if($CONF{file_added}->{$f_name} == $f_size) { log_debug("Skip '$f_name' due already importing") if EXTRA_DEBUG; $STAT{'already added'} ++; update_procline(); return; } log_debug("File '$f_name' is already importing with size '$CONF{file_added}->{$f_name}'"); } elsif(exists $CONF{file_active}->{$f_name}) { if($CONF{file_active}->{$f_name} == $f_size) { log_debug("Already imported '$f_name' with size '$f_size'") if EXTRA_DEBUG; if($CONF{delete_imported}) { if($f_name eq $orig_name) { log_debug("Delete already imported '$f_name'"); } else { log_debug("Delete already imported as '$f_name' file '$orig_name'"); } rm_file($orig_name); } delete $CONF{file_active}->{$f_name}; $STAT{'already imported'} ++; update_procline(); return; } log_debug("Already imported '$f_name' with size '$CONF{file_active}->{$f_name}'"); delete $CONF{file_active}->{$f_name}; } else { # not imported and not importing if($CONF{update_only}) { log_debug("Not imported '$f_name', skip due 'update_only'") if EXTRA_DEBUG; $STAT{'skip not imported'} ++; return; } else { log_debug("Not imported '$f_name', size $f_size, $year-$month-$mday $hour:$min:$sec"); } } $STAT{'not imported'} ++; add_file($orig_name, $f_name, $f_size); }